Skip to content

Send WS messages in parallel and reuse serialized data#1425

Merged
karashiiro merged 1 commit intoUniversalis-FFXIV:v2from
karashiiro:fix/ws-fanout
Dec 30, 2025
Merged

Send WS messages in parallel and reuse serialized data#1425
karashiiro merged 1 commit intoUniversalis-FFXIV:v2from
karashiiro:fix/ws-fanout

Conversation

@karashiiro
Copy link
Member

@karashiiro karashiiro commented Dec 30, 2025

  • Uses Parallel.ForEach to send WS messages in parallel instead of allowing them to fan out linearly with the number of connected clients
  • Reuses serialized bytes for each message between clients for performance, as the serialized message is guaranteed to be the same between them

Summary by CodeRabbit

  • Tests

    • Added comprehensive unit tests for real-time message serialization and processor functionality.
  • Bug Fixes

    • Improved error handling in message distribution to ensure a single client failure does not interrupt delivery to other clients.

✏️ Tip: You can customize this high-level summary in your review settings.

@karashiiro karashiiro added the enhancement New feature or request label Dec 30, 2025
@coderabbitai
Copy link

coderabbitai bot commented Dec 30, 2025

📝 Walkthrough

Walkthrough

This PR introduces message serialization caching and parallel message dispatch optimization. A caching mechanism stores pre-serialized bytes on SocketMessage instances for reuse. The message dispatch in SocketProcessor shifts from synchronous iteration to Parallel.ForEach with per-connection error handling. SocketClient integrates the cached serialization directly. Comprehensive tests validate caching behavior and parallel dispatch resilience.

Changes

Cohort / File(s) Summary
Message Caching Mechanism
src/Universalis.Application/Realtime/Messages/SocketMessage.cs
Added internal CachedSerializedBytes property and GetSerializedBytes(pool) method to support pre-serialization and caching of BSON-serialized message bytes. Returns cached bytes if available; otherwise serializes fresh and optionally caches.
Dispatch Parallelization
src/Universalis.Application/Realtime/SocketProcessor.cs
Replaced synchronous foreach with Parallel.ForEach for concurrent connection dispatch. Serializes message once before parallel loop. Each push wrapped in try/catch with logging and exception counting. Added MaxDegreeOfParallelism hint.
Client Integration
src/Universalis.Application/Realtime/SocketClient.cs
Simplified SendEvent to call message.GetSerializedBytes(MemoryStreamPool) and send in single WebSocketMessage instead of chunked BSON streaming. Removed manual serialization loop logic.
Test Suite: Message Caching
src/Universalis.Application.Tests/Realtime/Messages/SocketMessageTests.cs
Added five unit tests validating cached byte return, fresh serialization, BSON validity, cache consistency, and fallback serialization for SubscribeFailure events.
Test Suite: Processor Dispatch
src/Universalis.Application.Tests/Realtime/SocketProcessorTests.cs
Added two tests: one verifying CachedSerializedBytes population during Publish, another confirming Parallel.ForEach resilience when a single client throws (good clients still receive while bad client is logged).
Project Configuration
src/Universalis.Application/Universalis.Application.csproj
Added InternalsVisibleTo entry for Universalis.Application.Tests to grant test assembly access to internal caching members.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~23 minutes

Possibly related PRs

Poem

🐰 Bytes now cache where once they'd fly,
Parallel paths race through the sky,
One serialize, send to all—
Fast and fair, they heed the call!
💾✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 7.14% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and specifically describes the main objectives of the changeset: parallelizing WebSocket message dispatch and implementing serialization caching to reuse bytes across clients.
✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/Universalis.Application/Realtime/SocketProcessor.cs (1)

43-49: Consider adding a writer flush before reading the stream.

The BsonBinaryWriter may buffer data internally. While MongoDB's implementation typically flushes on dispose, explicitly flushing before ToArray() would be more defensive.

🔎 Proposed fix
 private static byte[] SerializeMessage(SocketMessage message)
 {
     using var stream = MemoryStreamPool.GetStream();
     using var writer = new BsonBinaryWriter(stream);
     BsonSerializer.Serialize(writer, message.GetType(), message);
+    writer.Flush();
     return stream.ToArray();
 }
src/Universalis.Application/Realtime/Messages/SocketMessage.cs (1)

28-37: Implementation is correct; consider consistency with SocketProcessor.SerializeMessage.

The on-the-fly serialization path here mirrors SocketProcessor.SerializeMessage. Both could benefit from an explicit writer.Flush() before ToArray() for defensive coding.

Also, there's minor duplication between this fallback path and SerializeMessage in SocketProcessor. If this becomes a maintenance concern, consider extracting a shared serialization helper.

🔎 Proposed fix for flush
 internal ReadOnlyMemory<byte> GetSerializedBytes(RecyclableMemoryStreamManager pool)
 {
     if (CachedSerializedBytes != null)
         return CachedSerializedBytes;

     using var stream = pool.GetStream();
     using var writer = new BsonBinaryWriter(stream);
     BsonSerializer.Serialize(writer, GetType(), this);
+    writer.Flush();
     return stream.ToArray();
 }
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Jira integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 5f9b0d7 and ae20571.

📒 Files selected for processing (6)
  • src/Universalis.Application.Tests/Realtime/Messages/SocketMessageTests.cs
  • src/Universalis.Application.Tests/Realtime/SocketProcessorTests.cs
  • src/Universalis.Application/Realtime/Messages/SocketMessage.cs
  • src/Universalis.Application/Realtime/SocketClient.cs
  • src/Universalis.Application/Realtime/SocketProcessor.cs
  • src/Universalis.Application/Universalis.Application.csproj
🧰 Additional context used
🧬 Code graph analysis (3)
src/Universalis.Application.Tests/Realtime/SocketProcessorTests.cs (3)
src/Universalis.Application/Realtime/SocketProcessor.cs (2)
  • SocketProcessor (18-95)
  • Publish (51-77)
src/Universalis.Application/Realtime/ISocketClient.cs (1)
  • Push (14-14)
src/Universalis.Application/Realtime/Messages/SocketMessage.cs (2)
  • SocketMessage (9-38)
  • SocketMessage (20-23)
src/Universalis.Application.Tests/Realtime/Messages/SocketMessageTests.cs (1)
src/Universalis.Application.Tests/Realtime/SocketProcessorTests.cs (5)
  • Fact (20-35)
  • Fact (37-72)
  • Fact (74-98)
  • Fact (100-128)
  • Fact (130-156)
src/Universalis.Application/Realtime/SocketProcessor.cs (2)
src/Universalis.Application/Realtime/Messages/SocketMessage.cs (2)
  • SocketMessage (9-38)
  • SocketMessage (20-23)
src/Universalis.Application/Realtime/ISocketClient.cs (1)
  • Push (14-14)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Codacy Static Code Analysis
  • GitHub Check: Analyze (csharp)
🔇 Additional comments (7)
src/Universalis.Application/Realtime/SocketClient.cs (1)

269-273: LGTM! Clean simplification of the send logic.

The refactored SendEvent now correctly leverages the new GetSerializedBytes API which handles both cached (from SocketProcessor.Publish) and on-the-fly serialization (for per-client messages like SubscribeFailure). The single SendAsync call with EndOfMessage is cleaner than the previous streaming loop.

src/Universalis.Application.Tests/Realtime/Messages/SocketMessageTests.cs (2)

29-41: Good test coverage for serialization paths.

The tests comprehensively cover the caching mechanism. Minor note: Lines 39-40 have slightly redundant assertions (Assert.False(result.IsEmpty) followed by Assert.True(result.Length > 0)), but this doesn't affect correctness.


74-87: Good coverage for the fallback serialization path.

This test correctly validates that SubscribeFailure messages (sent per-client without pre-caching) serialize properly via the on-the-fly path.

src/Universalis.Application/Realtime/SocketProcessor.cs (1)

51-77: Solid parallel dispatch implementation with proper error isolation.

The approach correctly:

  • Serializes once before dispatch (line 56)
  • Uses Parallel.ForEach with bounded parallelism
  • Isolates per-connection failures with try/catch
  • Tracks exceptions via counter

One observation: Publish mutates the incoming message by setting CachedSerializedBytes. This is fine for the current usage pattern but could be surprising if a caller ever reuses a message object across multiple Publish calls with different content expectations.

src/Universalis.Application/Universalis.Application.csproj (1)

8-10: LGTM! Standard pattern for test access to internals.

Exposing internals to the test assembly is the correct approach for testing internal caching behavior without polluting the public API.

src/Universalis.Application.Tests/Realtime/SocketProcessorTests.cs (2)

20-35: Good test for caching side effect.

This test correctly verifies that Publish populates CachedSerializedBytes on the message, which is the core optimization of this PR.


37-72: Important test for error resilience in parallel dispatch.

This test validates that a failing client doesn't prevent other clients from receiving messages—a critical property for the Parallel.ForEach implementation. The verification that badClient.Push was still called (line 71) confirms the exception was thrown and handled gracefully.

@karashiiro karashiiro merged commit 1ea6e64 into Universalis-FFXIV:v2 Dec 30, 2025
4 of 5 checks passed
@karashiiro karashiiro deleted the fix/ws-fanout branch December 30, 2025 22:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant